#include "vet_cmd_handler.h"
#include "vet_protobuf_convertor.h"
#include "vet_system_status.h"

#include<math.h>

VET_INDEX_NAMESPACE_BEGIN

std::string CmdHandler::VetKeyFormat (
        const VetReqCnt& o_vet_req ,
        uint32_t ui_no_vet ,
        uint32_t ui_dim) {
    std::string s_vet_data;

    std::ostringstream stream;
    stream.precision(VET_VALUE_PRECISION);

    uint64_t ui_start = ui_no_vet * ui_dim;
    for (size_t i = ui_start; i < ui_start + ui_dim; i++) {
        stream << o_vet_req.o_vet_data_[i];
        s_vet_data += (stream.str() + "|");
        stream.str("");
    }
    
    return std::to_string(o_vet_req.o_idx_flag_.i_appid_) +"#" 
            + std::to_string(o_vet_req.o_idx_flag_.i_field_id_) + "#" 
            + s_vet_data;
};

CmdHandler::CmdHandler(CPollThread* p_thread)
    : CTaskDispatcher<CTaskRequest>(p_thread)
    , p_vet_idx_(IndexFactory::Instance())
    , p_rocksdb_(RocksDbOperator::Instance())
    , p_app_field_table_(const_cast<AppFieldContext*>(
                    ConfigManager::Instance()->GetAppFieldTable()))
{

}

CmdHandler::~CmdHandler()
{

}

void CmdHandler::set_reply_info(
    CTaskRequest* p_task_req ,
    const VetRspCnt& o_vet_rsp_cnt)
{
    std::string s_result_buf;
    bool b_ret = PbCvtor::ConvertVetRspCnt2PbStr(o_vet_rsp_cnt , s_result_buf);
    if (!b_ret) {
        s_result_buf = "pb convert has some errors";
    }

    p_task_req->setResult(s_result_buf);
    p_task_req->ReplyNotify();
}

void CmdHandler::TaskNotify(CTaskRequest* p_task_req)
{
    log_debug("CmdHandler::TaskNotify start");

    VetRspCnt o_vet_rsp_cnt;

    if (NULL == p_task_req) {
        o_vet_rsp_cnt(-1 , "CTaskRequest is null");
        set_reply_info(p_task_req , o_vet_rsp_cnt);
        return;
    }
    
    VetReqCnt o_vet_req;
    bool b_ret = PbCvtor::ConvertPbStr2VetReqCnt(
                p_task_req->buildRequsetString(), o_vet_req);
    if (!b_ret) {
        o_vet_rsp_cnt(-1 , "convert request pb to ram has error");
        set_reply_info(p_task_req , o_vet_rsp_cnt);
        return;
    }

    if (!p_app_field_table_->ValidityCheck(
            o_vet_req.o_idx_flag_.i_appid_ ,
            o_vet_req.o_idx_flag_.i_field_id_)) {
        o_vet_rsp_cnt(-1 , "(appid + fieldid) not existed in config");
        set_reply_info(p_task_req , o_vet_rsp_cnt);
        return;
    }
    
    if (!o_vet_req.o_idx_flag_.b_has_index_typeid_) {
        o_vet_req.o_idx_flag_.i_index_typeid_ = p_app_field_table_->GetValue(o_vet_req.o_idx_flag_.i_appid_,
                    o_vet_req.o_idx_flag_.i_field_id_).ui_search_index_type_;
    }
    
    b_ret = p_vet_idx_->CheckUniIdxIsValid(o_vet_req.o_idx_flag_);
    if (!b_ret) {
        log_error("appid:%d , fieldid:%d do not support vector index" , 
                        o_vet_req.o_idx_flag_.i_appid_,
                        o_vet_req.o_idx_flag_.i_field_id_);
        o_vet_rsp_cnt(-1 , "(appid + fieldid) do not support vector index");
        set_reply_info(p_task_req , o_vet_rsp_cnt);
        return;
    }

    log_debug("appid:%d , fieldid:%d , index_typeid:%d" , o_vet_req.o_idx_flag_.i_appid_ , o_vet_req.o_idx_flag_.i_field_id_ , o_vet_req.o_idx_flag_.i_index_typeid_);

    uint32_t ui_dim = p_app_field_table_->GetValue(o_vet_req.o_idx_flag_.i_appid_,
                            o_vet_req.o_idx_flag_.i_field_id_).ui_dim_;
    
    b_ret = o_vet_req.CheckIsValid(ui_dim);
    if (!b_ret) {
        o_vet_rsp_cnt(-1 , "vector request context is illegal");
        set_reply_info(p_task_req , o_vet_rsp_cnt);
        return;
    }

    switch (p_task_req->GetReqCmd())
    {
    case SERVICE_VECTOR_ADD:
        {
            RocksDbOperator::UpdatePair o_update_vet;
            int i_vet_num = (o_vet_req.o_vet_data_.size() / ui_dim);
            std::set<int> o_existed_vet_seqid;
            for (int i = 0;  i < i_vet_num; i++) {
                std::string s_vet_key = std::move(VetKeyFormat(o_vet_req , i , ui_dim));
                std::string s_vet_id;
    
                int i_ret = p_rocksdb_->GetEntry(
                                    s_vet_key,
                                    s_vet_id,
                                    E_COL_FAM_VECTOR_DATA);
                if (0 == i_ret) {
                    o_existed_vet_seqid.insert(i);
                    o_vet_rsp_cnt.vet_ids.emplace_back((int64_t)strtoll(s_vet_id.c_str(),NULL,10));
                    log_debug("vector:[%s] has existed in rocksdb storage" , s_vet_key.c_str());
                    continue;
                }
    
                s_vet_id = SystemState::Instance()->GetVetId();
                o_update_vet.emplace_back(std::make_pair(s_vet_key , s_vet_id));

                i_ret = SystemState::Instance()->AutoIncVetId();
                if (i_ret != 0) {
                    o_vet_rsp_cnt(-1 , "refresh vet id has errors");
                    set_reply_info(p_task_req , o_vet_rsp_cnt);
                    return;
                }
            }

            if (o_update_vet.empty() && 
                ((int)o_vet_rsp_cnt.vet_ids.size() == i_vet_num) ) {
                o_vet_rsp_cnt(0 , "add vector success");
                set_reply_info(p_task_req , o_vet_rsp_cnt);
                return;
            }
            
            std::vector<int64_t> o_ids;
            for (size_t i = 0; i < o_update_vet.size(); i++) {
                o_ids.emplace_back((int64_t)strtoll(o_update_vet[i].second.c_str(),NULL,10) );
            }

            uint32_t ui_ids_idx = 0;
            for (int i = 0; i < i_vet_num; i++) {
                if (o_existed_vet_seqid.find(i) == o_existed_vet_seqid.end()
                && ui_ids_idx < o_update_vet.size()) {
                    o_vet_req.o_idx_flag_.i_index_typeid_ = 0;
                    int i_total_num = p_vet_idx_->GetIdxNum(o_vet_req.o_idx_flag_);

                    for (int i_idx = 0; i_idx < i_total_num; i_idx++) {
                        o_vet_req.o_idx_flag_.i_index_typeid_ = i_idx;
                        int i_ret = (*p_vet_idx_)[o_vet_req.o_idx_flag_]->InsertVet(
                                o_vet_req.o_vet_data_.data() + i * ui_dim,
                                &o_ids[ui_ids_idx]);
                        if (-1 == i_ret) {
                            // need remove added vector
                            for (int i_roll_idx = 0; i_roll_idx <= i_idx; i_roll_idx++) {
                                o_vet_req.o_idx_flag_.i_index_typeid_ = i_roll_idx;
                                i_ret = (*p_vet_idx_)[o_vet_req.o_idx_flag_]->RemoveVet(
                                    &o_ids[ui_ids_idx]);
                            }

                            uint32_t ui_total_undo_ids_num = (ui_ids_idx - 1);
                            for (int i_roll_idx = 0; 
                                (i_roll_idx < i_total_num)
                                && (ui_ids_idx <= ui_total_undo_ids_num)
                                && (ui_ids_idx >= 0);
                                ++i_roll_idx , --ui_ids_idx) {
                                o_vet_req.o_idx_flag_.i_index_typeid_ = i_roll_idx;
                                i_ret = (*p_vet_idx_)[o_vet_req.o_idx_flag_]->RemoveVet(
                                    &o_ids[ui_ids_idx]);
                            }
                            
                            o_vet_rsp_cnt.vet_ids.clear();
                            o_vet_rsp_cnt(-1 , "add vector [faiss insert entry] has errors");
                            set_reply_info(p_task_req , o_vet_rsp_cnt);
                            return;
                        }
                    }
                    o_vet_rsp_cnt.vet_ids.emplace_back(o_ids[ui_ids_idx]);
                    ++ ui_ids_idx;
                }
            }

            std::set<std::string> o_del_keys_set;
            int i_ret = p_rocksdb_->BatchUpdate(
                                    o_del_keys_set,
                                    o_update_vet,
                                    false,
                                    E_COL_FAM_VECTOR_DATA);
            if (i_ret != 0) {
                o_vet_rsp_cnt(-2 , "add vector [rocksdb batch update] has errors");
                break;
            }

            if ((int)o_vet_rsp_cnt.vet_ids.size() == i_vet_num) {
                o_vet_rsp_cnt(0 , "add vector success");
            }
        }
        break;
    case SERVICE_VECTOR_DELETE:
        {
            // need Atomic batch delete
            if (!(*p_vet_idx_)[o_vet_req.o_idx_flag_]->GetDeleEnableFlag()) {
                o_vet_rsp_cnt(-1 , "do not support delete");
                break;
            } else {
                log_debug("support delete");
            }
            
            std::set<std::string> o_del_keys_set;
            uint64_t ui_del_num = (o_vet_req.o_vet_data_.size() / ui_dim);

            for (size_t i = 0; i < ui_del_num; ++i) {
                o_del_keys_set.insert(std::move(VetKeyFormat(o_vet_req , i , ui_dim)));
            }

            std::vector<int> col_fam_type_vet(o_del_keys_set.size() , E_COL_FAM_VECTOR_DATA);
            std::vector<std::string> o_del_keys_vet(o_del_keys_set.begin() , o_del_keys_set.end());
            std::vector<std::string> values;
            std::set<int> error_idx;
            std::set<int> no_found_idx;
            bool b_ret = p_rocksdb_->MultiGetEntry(
                                        col_fam_type_vet ,
                                        o_del_keys_vet , 
                                        values,
                                        error_idx,
                                        no_found_idx);
            if (!b_ret) {
                o_vet_rsp_cnt(-1 , "delete vector [rocksdb mutigetentry] has errors");
                break;
            }
            
            int i_ret = p_rocksdb_->BatchUpdate(o_del_keys_set);
            if (i_ret != 0  && i_ret != -1) {
                o_vet_rsp_cnt(-1 , "delete vector [rocksdb batchupdate] has errors");
                break;
            }
            
            std::vector<int64_t> o_ids_vet;
            for (size_t i = 0; i < values.size(); i++) {
                if (no_found_idx.find(i) == no_found_idx.end()) {
                    if (!values[i].empty()) {
                        o_ids_vet.emplace_back((int64_t)strtoll(values[i].c_str(),NULL,10));
                    }
                }
            }
            
            if (!o_ids_vet.empty()) {
                i_ret = (*p_vet_idx_)[o_vet_req.o_idx_flag_]->RemoveVet(
                                    o_ids_vet.data(),
                                    o_ids_vet.size());
                if (i_ret != (int)o_ids_vet.size()) {
                    o_vet_rsp_cnt(-1 , "delete vector [faiss remove vet] has errors");
                    break;
                }
            }
            
            o_vet_rsp_cnt(0 , "delete vector success");
        }
        break;
    case SERVICE_VECTOR_QUERY:
        {
            uint64_t ui_query_num = (o_vet_req.o_vet_data_.size() / ui_dim);
            std::vector<int64_t> o_ids_vet(o_vet_req.ui_top_k_ * ui_query_num);
            std::vector<float> o_dis_vet(o_vet_req.ui_top_k_ * ui_query_num);

            int i_ret = (*p_vet_idx_)[o_vet_req.o_idx_flag_]->SearchVet(
                                o_vet_req.o_vet_data_.data(),
                                o_vet_req.ui_top_k_,
                                o_ids_vet.data(),
                                o_dis_vet.data(),
                                ui_query_num);
            
            if (-1 == i_ret) {
                o_vet_rsp_cnt(-1 , "search vector has some errors");
                break;
            }
            
            o_vet_rsp_cnt(0 , "search vector success");
            o_vet_rsp_cnt.vet_ids.swap(o_ids_vet);
            o_vet_rsp_cnt.vet_dis.swap(o_dis_vet);
        }
        break;
    case SERVICE_VECTOR_QUERY_ID:
        {
            std::set<std::string> o_search_keys_set;
            uint64_t ui_del_num = (o_vet_req.o_vet_data_.size() / ui_dim);

            for (size_t i = 0; i < ui_del_num; ++i) {
                o_search_keys_set.insert(std::move(VetKeyFormat(o_vet_req , i , ui_dim)));
            }

            std::vector<int> col_fam_type_vet(o_search_keys_set.size() ,
                             E_COL_FAM_VECTOR_DATA);
            std::vector<std::string> o_search_keys_vet(o_search_keys_set.begin() ,
                             o_search_keys_set.end());
            std::vector<std::string> values;
            std::set<int> error_idx;
            std::set<int> no_found_idx;
            bool b_ret = p_rocksdb_->MultiGetEntry(
                                        col_fam_type_vet ,
                                        o_search_keys_vet ,
                                        values,
                                        error_idx,
                                        no_found_idx);
            if (!b_ret) {
                o_vet_rsp_cnt(-1 , "id search [rocksdb mutigetentry] has errors");
                break;
            }

            for (size_t i = 0; i < values.size(); i++) {
                if (!values[i].empty() 
                && no_found_idx.find(i) == no_found_idx.end()) {
                    o_vet_rsp_cnt.vet_ids.emplace_back((int64_t)strtoll(values[i].c_str(),NULL,10));
                } else if (no_found_idx.find(i) != no_found_idx.end()) {
                    o_vet_rsp_cnt.vet_ids.emplace_back(-1);
                }
            }

            o_vet_rsp_cnt(0 , "id search success");
        }
        break;
    default:
        break;
    }

    set_reply_info(p_task_req , o_vet_rsp_cnt);
    return;
}

VET_INDEX_NAMESPACE_END
